package com.supply.hsa.link.task.fetch.hotel.container;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import com.supply.hsa.link.common.constant.SupplierClass;
import com.supply.hsa.link.task.fetch.CommonConstant;
import com.supply.hsa.link.task.fetch.process.ThreadPoolConfig;
import org.apache.commons.collections.MapUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;


@Service
public class ThreadPoolContainer implements ApplicationListener<ContextRefreshedEvent>{
	
	public static final int FETCH_THREAD_POOL_SIZE = 20;
	
	@Autowired
	private SupplyHotelProcessorContainer supplyHotelProcessorContainer;
	
	@Autowired
	private SupplyFetchDataServiceContainer supplyFetchDataServiceContainer;
	
	/**
	 * 数据处理线程池
	 */
	private Map<SupplierClass, ThreadPoolExecutor> analyzeThreadPoolExecutorMap;
	
	/**
	 * 数据查询线程池
	 */
	private Map<SupplierClass, ThreadPoolExecutor> fetchThreadPoolExecutorMap;
	
	/**
	 * 数据处理自启动线程池
	 */
	private Map<SupplierClass, ScheduledExecutorService> supplyAnalyzeAutoStartScheduledExecutor;
	
	@Autowired
	private ThreadPoolConfig threadPoolConfig;
	
	private ReentrantLock fetchThreadPoolLock = new ReentrantLock();

	@Override
	public void onApplicationEvent(ContextRefreshedEvent event) {
		if(event.getApplicationContext().getDisplayName().equals(CommonConstant.SPRING_APPLICATION_CONTEXT_NAME)){
			Set<SupplierClass> registeredSupplier = supplyHotelProcessorContainer.getRegisteredHotelInfoProcessorSupplier();
			analyzeThreadPoolExecutorMap = new HashMap<SupplierClass, ThreadPoolExecutor>(registeredSupplier.size());
			fetchThreadPoolExecutorMap = new HashMap<SupplierClass, ThreadPoolExecutor>(registeredSupplier.size());
			supplyAnalyzeAutoStartScheduledExecutor = new HashMap<SupplierClass, ScheduledExecutorService>(registeredSupplier.size());
		}
	}
	
	public void onRefreshFetchThreadPool(Map<SupplierClass, Integer> threadPoolMap){
		if(MapUtils.isEmpty(threadPoolMap)){
			return;
		}
		
		fetchThreadPoolLock.lock();
		try {
			for(Map.Entry<SupplierClass, Integer> entry : threadPoolMap.entrySet()){
				ThreadPoolExecutor executor = fetchThreadPoolExecutorMap.get(entry.getKey());
				if(executor != null){
					executor.shutdownNow();
					executor = null;
					
					this.createFetchThreadPool(entry.getKey(), entry.getValue());
				}
			}
			
			
		} finally{
			fetchThreadPoolLock.unlock();
		}
	}
	
	public ThreadPoolExecutor getFetchThreadPoolExecutor(SupplierClass supplierClass){
		if (fetchThreadPoolExecutorMap.get(supplierClass) == null) {
			fetchThreadPoolLock.lock();
			try {
				if (fetchThreadPoolExecutorMap.get(supplierClass) == null) {
					Map<SupplierClass, Integer> threadPoolMap = threadPoolConfig.getFetchThreadPool();
					int poolSize = threadPoolMap.get(supplierClass)!=null ? threadPoolMap.get(supplierClass) : FETCH_THREAD_POOL_SIZE;
					this.createFetchThreadPool(supplierClass, poolSize);
				}
			} finally {
				fetchThreadPoolLock.unlock();
			}
		}
		
		return fetchThreadPoolExecutorMap.get(supplierClass);
	}
	
	private void createFetchThreadPool(SupplierClass supplierClass, Integer poolSize){
		ThreadPoolExecutor executor = new ThreadPoolExecutor(
				poolSize, poolSize, 0, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>());

		fetchThreadPoolExecutorMap.put(supplierClass, executor);
	}
	
	public ThreadPoolExecutor getAnalyzeThreadPoolExecutor(SupplierClass supplierClass){
//		if(analyzeThreadPoolExecutorMap.get(supplierClass) == null){
//			synchronized (this) {
//				if(analyzeThreadPoolExecutorMap.get(supplierClass) == null){
//					List<String> nodeList = this.consistentHash.getNodes();//存储的数据节点
//					int corePool = nodeList!=null ? nodeList.size()+1 : 0;
//					ThreadPoolExecutor executor = new ThreadPoolExecutor(corePool, corePool, 0, TimeUnit.MILLISECONDS,
//							new LinkedBlockingQueue<Runnable>());
//
//					analyzeThreadPoolExecutorMap.put(supplierClass, executor);
//				}
//			}
//		}
		
		return analyzeThreadPoolExecutorMap.get(supplierClass);
	}
	
	public ScheduledExecutorService getAnaylzeScheduledPool(SupplierClass supplierClass){
		if(supplyAnalyzeAutoStartScheduledExecutor.get(supplierClass) == null){
			synchronized (this) {
				if(supplyAnalyzeAutoStartScheduledExecutor.get(supplierClass) == null){
					supplyAnalyzeAutoStartScheduledExecutor.put(supplierClass, Executors.newScheduledThreadPool(1));
				}
			}
		}
		
		return supplyAnalyzeAutoStartScheduledExecutor.get(supplierClass);
	}
	
}
